/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.parquet.crypto.keytools;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;

import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;

/**
 * KeyMaterial class represents the "key material", keeping the information that allows readers to recover an encryption key (see 
 * description of the KeyMetadata class). The keytools package (PARQUET-1373) implements the "envelope encryption" pattern, in a 
 * "single wrapping" or "double wrapping" mode. In the single wrapping mode, the key material is generated by encrypting the 
 * "data encryption key" (DEK) by a "master key". In the double wrapping mode, the key material is generated by encrypting the DEK 
 * by a "key encryption key" (KEK), that in turn is encrypted by a "master key".
 * 
 * Key material is kept in a flat json object, with the following fields:
 * 1. "keyMaterialType" - a String, with the type of  key material. In the current version, only one value is allowed - "PKMT1" (stands 
 *     for "parquet key management tools, version 1"). For external key material storage, this field is written in both "key metadata" and 
 *     "key material" jsons. For internal key material storage, this field is written only once in the common json.
 * 2. "isFooterKey" - a boolean. If true, means that the material belongs to a file footer key, and keeps additional information (such as
 *     KMS instance ID and URL). If false, means that the material belongs to a column key.
 * 3. "kmsInstanceID" - a String, with the KMS Instance ID. Written only in footer key material.
 * 4. "kmsInstanceURL" - a String, with the KMS Instance URL. Written only in footer key material.
 * 5. "masterKeyID" - a String, with the ID of the master key used to generate the material.
 * 6. "wrappedDEK" - a String, with the wrapped DEK (base64 encoding).
 * 7. "doubleWrapping" - a boolean. If true, means that the material was generated in double wrapping mode. 
 *     If false - in single wrapping mode.
 * 8. "keyEncryptionKeyID" - a String, with the ID of the KEK used to generate the material. Written only in double wrapping mode.
 * 9. "wrappedKEK" - a String, with the wrapped KEK (base64 encoding). Written only in double wrapping mode.
 */
public class KeyMaterial {
  static final String KEY_MATERIAL_TYPE_FIELD = "keyMaterialType";
  static final String KEY_MATERIAL_TYPE1 = "PKMT1";

  static final String FOOTER_KEY_ID_IN_FILE = "footerKey";
  static final String COLUMN_KEY_ID_IN_FILE_PREFIX = "columnKey";

  private static final String IS_FOOTER_KEY_FIELD = "isFooterKey";
  private static final String DOUBLE_WRAPPING_FIELD = "doubleWrapping";
  private static final String KMS_INSTANCE_ID_FIELD = "kmsInstanceID";
  private static final String KMS_INSTANCE_URL_FIELD = "kmsInstanceURL";
  private static final String MASTER_KEY_ID_FIELD = "masterKeyID";
  private static final String WRAPPED_DEK_FIELD = "wrappedDEK";
  private static final String KEK_ID_FIELD = "keyEncryptionKeyID";
  private static final String WRAPPED_KEK_FIELD = "wrappedKEK";

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

  private final boolean isFooterKey;
  private final String kmsInstanceID;
  private final String kmsInstanceURL;
  private final String masterKeyID;
  private final boolean isDoubleWrapped;
  private final String kekID;
  private final String encodedWrappedKEK;
  private final String encodedWrappedDEK;

  private KeyMaterial(boolean isFooterKey, String kmsInstanceID, String kmsInstanceURL, String masterKeyID, 
      boolean isDoubleWrapped, String kekID, String encodedWrappedKEK, String encodedWrappedDEK) {
    this.isFooterKey = isFooterKey;
    this.kmsInstanceID = kmsInstanceID;
    this.kmsInstanceURL = kmsInstanceURL;
    this.masterKeyID = masterKeyID;
    this.isDoubleWrapped = isDoubleWrapped;
    this.kekID = kekID;
    this.encodedWrappedKEK = encodedWrappedKEK;
    this.encodedWrappedDEK = encodedWrappedDEK;
  }

  // parses external key material
  static KeyMaterial parse(String keyMaterialString) {
    Map<String, Object> keyMaterialJson = null;
    try {
      keyMaterialJson = OBJECT_MAPPER.readValue(new StringReader(keyMaterialString),
          new TypeReference<Map<String, Object>>() {});
    } catch (IOException e) {
      throw new ParquetCryptoRuntimeException("Failed to parse key metadata " + keyMaterialString, e);
    }
    // 1. External key material - extract "key material type", and make sure it is supported
    String keyMaterialType = (String) keyMaterialJson.get(KEY_MATERIAL_TYPE_FIELD);
    if (!KEY_MATERIAL_TYPE1.equals(keyMaterialType)) {
      throw new ParquetCryptoRuntimeException("Wrong key material type: " + keyMaterialType + 
          " vs " + KEY_MATERIAL_TYPE1);
    }
    // Parse other fields (common to internal and external key material)
    return parse(keyMaterialJson);
  }

  // parses fields common to internal and external key material
  static KeyMaterial parse(Map<String, Object> keyMaterialJson) {
    // 2. Check if "key material" belongs to file footer key
    Boolean isFooterKey = (Boolean) keyMaterialJson.get(IS_FOOTER_KEY_FIELD);
    String kmsInstanceID = null;
    String kmsInstanceURL = null;
    if (isFooterKey) {
      // 3.  For footer key, extract KMS Instance ID
      kmsInstanceID = (String) keyMaterialJson.get(KMS_INSTANCE_ID_FIELD);
      // 4.  For footer key, extract KMS Instance URL
      kmsInstanceURL = (String) keyMaterialJson.get(KMS_INSTANCE_URL_FIELD);
    }
    // 5. Extract master key ID
    String masterKeyID = (String) keyMaterialJson.get(MASTER_KEY_ID_FIELD);
    // 6. Extract wrapped DEK
    String  encodedWrappedDEK = (String) keyMaterialJson.get(WRAPPED_DEK_FIELD);
    String kekID = null;
    String encodedWrappedKEK = null;
    // 7. Check if "key material" was generated in double wrapping mode
    Boolean isDoubleWrapped = (Boolean) keyMaterialJson.get(DOUBLE_WRAPPING_FIELD);
    if (isDoubleWrapped) {
      // 8. In double wrapping mode, extract KEK ID
      kekID = (String) keyMaterialJson.get(KEK_ID_FIELD);
      // 9. In double wrapping mode, extract wrapped KEK
      encodedWrappedKEK = (String) keyMaterialJson.get(WRAPPED_KEK_FIELD);
    }

    return new KeyMaterial(isFooterKey, kmsInstanceID, kmsInstanceURL, masterKeyID, isDoubleWrapped, kekID, encodedWrappedKEK, encodedWrappedDEK);
  }

  static String createSerialized(boolean isFooterKey, String kmsInstanceID, String kmsInstanceURL, String masterKeyID, 
      boolean isDoubleWrapped, String kekID, String encodedWrappedKEK, String encodedWrappedDEK, boolean isInternalStorage) {
    Map<String, Object> keyMaterialMap = new HashMap<String, Object>(10);
    // 1. Write "key material type"
    keyMaterialMap.put(KEY_MATERIAL_TYPE_FIELD, KEY_MATERIAL_TYPE1);
    if (isInternalStorage) {
      // for internal storage, key material and key metadata are the same.
      // adding the "internalStorage" field that belongs to KeyMetadata.
      keyMaterialMap.put(KeyMetadata.KEY_MATERIAL_INTERNAL_STORAGE_FIELD, Boolean.TRUE);
    }
    // 2. Write isFooterKey
    keyMaterialMap.put(IS_FOOTER_KEY_FIELD, Boolean.valueOf(isFooterKey));
    if (isFooterKey) {
      // 3. For footer key, write KMS Instance ID
      keyMaterialMap.put(KMS_INSTANCE_ID_FIELD, kmsInstanceID);
      // 4. For footer key, write KMS Instance URL
      keyMaterialMap.put(KMS_INSTANCE_URL_FIELD, kmsInstanceURL);
    }
    // 5. Write master key ID
    keyMaterialMap.put(MASTER_KEY_ID_FIELD, masterKeyID);
    // 6. Write wrapped DEK
    keyMaterialMap.put(WRAPPED_DEK_FIELD, encodedWrappedDEK);
    // 7. Write isDoubleWrapped
    keyMaterialMap.put(DOUBLE_WRAPPING_FIELD, Boolean.valueOf(isDoubleWrapped));
    if (isDoubleWrapped) {
      // 8. In double wrapping mode, write KEK ID
      keyMaterialMap.put(KEK_ID_FIELD, kekID);
      // 9. In double wrapping mode, write wrapped KEK
      keyMaterialMap.put(WRAPPED_KEK_FIELD, encodedWrappedKEK);
    }

    try {
      return OBJECT_MAPPER.writeValueAsString(keyMaterialMap);
    } catch (IOException e) {
      throw new ParquetCryptoRuntimeException("Failed to serialize key material", e);
    }
  }

  boolean isFooterKey() {
    return isFooterKey;
  }

  boolean isDoubleWrapped() {
    return isDoubleWrapped;
  }

  String getMasterKeyID() {
    return masterKeyID;
  }

  String getWrappedDEK() {
    return encodedWrappedDEK;
  }

  String getKekID() {
    return kekID;
  }

  String getWrappedKEK() {
    return encodedWrappedKEK;
  }

  String getKmsInstanceID() {
    return kmsInstanceID;
  }

  String getKmsInstanceURL() {
    return kmsInstanceURL;
  }
}